package cn.doitedu.analy.groupon;

import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import utils.HbaseUtil;

/**
 * create "user_subject","groupon"
 */
public class GrouponSink implements SinkFunction<GrouponEntity> {
    @Override
    public void invoke(GrouponEntity value, Context context) throws Exception {
        String userId = value.getUserId();
        Integer times = value.getTimes();
        String timeStr = HbaseUtil.getData("user_subject", userId, "groupon", times + "");
        timeStr = timeStr==null?"0":timeStr;
        Integer preTimes = Integer.valueOf(timeStr);
        times += times + preTimes;
        HbaseUtil.putData("user_subject",userId,"groupon","times",times+"");

    }
}
